Skip to content

Conversation

@johanl-db
Copy link
Contributor

@johanl-db johanl-db commented Dec 15, 2023

What changes were proposed in this pull request?

This change adds the following conversions to the vectorized and non-vectorized Parquet readers corresponding to type promotions that are strictly widening without precision loss:

  • Int -> Long
  • Float -> Double
  • Int -> Double
  • Date -> TimestampNTZ (Timestamp without timezone only as a date has no timezone information)
  • Decimal with higher precision (already supported in non-vectorized reader)

Why are the changes needed?

These type promotions support two similar use cases:

  1. Reading a set of Parquet files with different types, e.g a mix of Int and Long for a given column. If the read schema is Long, the reader should be able to read all files and promote Ints to Longs instead of failing.
  2. Widening the type of a column in a table that already contains Parquet files, e.g. an Int column isn't large enough to accommodate IDs and is changed to Long. Existing Parquet files storing the value as Int should still be correctly read by upcasting values.

The second use case in particular will enable widening the type of columns or fields in existing Delta tables.

Does this PR introduce any user-facing change?

The following fails before this change:

Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < ${Long.MaxValue}").collect()

With the Int->Long promotion in both the vectorized and non-vectorized parquet readers, it succeeds and produces correct results, without overflow or loss of precision.
The same is true for Float->Double, Int->Double, Decimal with higher precision and Date->TimestampNTZ

How was this patch tested?

  • Added ParquetTypeWideningSuite covering the promotions included in this change, in particular:
    • Non-dictionary encoded values / dictionary encoded values for each promotion
    • Timestamp rebase modes LEGACY and CORRECTED for Date -> TimestampNTZ
    • Promotions between decimal types with different physical storage: INT32, INT64, BINARY, FIXED_LEN_BYTE_ARRAY
    • Reading values written with Parquet V1 and V2 writers.
  • Updated/Removed two tests that expect type promotion to fail.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Dec 15, 2023
@johanl-db johanl-db force-pushed the SPARK-40876-parquet-type-promotion branch from 2a640f4 to a4e8048 Compare December 15, 2023 13:12
@johanl-db johanl-db changed the title [WIP][SPARK-40876] Widening type promotions in Parquet readers [SPARK-40876] Widening type promotions in Parquet readers Dec 15, 2023
@johanl-db johanl-db force-pushed the SPARK-40876-parquet-type-promotion branch from 6ddb60b to c113d8e Compare December 15, 2023 14:48
@johanl-db johanl-db force-pushed the SPARK-40876-parquet-type-promotion branch from c113d8e to cb1487e Compare December 18, 2023 08:54
@johanl-db johanl-db force-pushed the SPARK-40876-parquet-type-promotion branch from e1c73e7 to dc8b489 Compare December 18, 2023 13:54
@dongjoon-hyun dongjoon-hyun changed the title [SPARK-40876] Widening type promotions in Parquet readers [SPARK-40876][SQL] Widening type promotions in Parquet readers Dec 18, 2023
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
return new IntegerWithRebaseUpdater(failIfRebase);
}
} else if (sparkType == DataTypes.TimestampNTZType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should support timestamp ltz as well, which is DataTypes.TimestmapType

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but this means the parquet reader needs to know the session timezone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, how do we know this INT32 is a logical DATE in parquet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a check to only allow reading INT32 with a date annotation.

I took a stab at Date->TimestampLTZ but it's not trivial and we would need to discuss the expected behavior, I can follow up in a different PR if we decide we need it. It's easy to get wrong and I'd rather disallow it for now.

@johanl-db johanl-db requested a review from cloud-fan December 19, 2023 17:20
}
}

test("SPARK-35640: int as long should throw schema incompatible error") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also cc @sunchao

Will this pr solve the problem described in SPARK-35461 too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johanl-db
Copy link
Contributor Author

@cloud-fan Can you merge this PR?

return new IntegerWithRebaseUpdater(failIfRebase);
}
} else if (sparkType == DataTypes.TimestampNTZType && isDateTypeMatched(descriptor)) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #44428 , ntz should never rebase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, this is date, and we need rebase

boolean needsUpcast = sparkType == LongType || (isDate && sparkType == TimestampNTZType) ||
!DecimalType.is32BitDecimalType(sparkType);
boolean needsRebase = logicalTypeAnnotation instanceof DateLogicalTypeAnnotation &&
!"CORRECTED".equals(datetimeRebaseMode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

parquetType.getLogicalTypeAnnotation.isInstanceOf[DateLogicalTypeAnnotation] =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = {
this.updater.set(DateTimeUtils.daysToMicros(dateRebaseFunc(value), ZoneOffset.UTC))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 3361f25 Dec 22, 2023
@LuciferYang
Copy link
Contributor

There are 3 test failed in ParquetTypeWideningSuite with spark.sql.ansi.enabled=true, cloud you take a look? @johanl-db

[info] - unsupported parquet conversion IntegerType -> TimestampType *** FAILED *** (68 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 261.0 failed 1 times, most recent failure: Lost task 1.0 in stage 261.0 (TID 523) (localhost executor driver): org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value '1.23' of the type "STRING" cannot be cast to "INT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22018
[info] == DataFrame ==
[info] "cast" was called from
[info] org.apache.spark.sql.execution.datasources.parquet.ParquetTypeWideningSuite.writeParquetFiles(ParquetTypeWideningSuite.scala:113)
[info] 
[info] 	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:145)
[info] 	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.withException(UTF8StringUtils.scala:51)
[info] 	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.toIntExact(UTF8StringUtils.scala:34)
[info] 	at org.apache.spark.sql.catalyst.util.UTF8StringUtils.toIntExact(UTF8StringUtils.scala)
[info] 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
[info] 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[info] 	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
[info] 	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:388)
[info] 	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:101)
[info] 	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:891)
[info] 	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:891)
[info] 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info] 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
[info] 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
[info] 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
[info] 	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
[info] 	at org.apache.spark.scheduler.Task.run(Task.scala:141)
[info] 	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:628)
[info] 	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
[info] 	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
[info] 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
[info] 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:631)
[info] 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] 	at java.base/java.lang.Thread.run(Thread.java:840)

@cloud-fan
Copy link
Contributor

@LuciferYang thanks for catching it! Does it block PR merging? We may need to wait for a few days as it's the holiday season. If you can fix it then it's even better. We can revert it first if it affects other PRs.

@LuciferYang
Copy link
Contributor

@cloud-fan This is not a serious issue, just ansi mode daily test failed, I don't think we need to revert.

@LuciferYang
Copy link
Contributor

@LuciferYang thanks for catching it! Does it block PR merging? We may need to wait for a few days as it's the holiday season. If you can fix it then it's even better. We can revert it first if it affects other PRs.

Just marking this, the issue has been fixed by #44481

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants